use super::{RawTaskContext, TaskContext, TaskRef, ActiveWorker};
use crate::event::{Scheduler, Timer};
use core::future::Future;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::pin::Pin;
use core::task::{Context, Poll};
use core::time::Duration;
use hioff::container_of_mut;

/// 通过宏`#[future]`使用。目的时消除Rust自动判定的Send存在的约束.
/// 一个Future内部只是多个异步函数串行，也无法跨子异步函数的await使用不支持Send的数据类型，比如不能使用Rc,只能使用Arc.
/// 另外因为异步函数的特殊性，无法类似thread::scope来消除生命周期相关的使用约束。这些实际功能正确无风险的代码可能导致Future无法满足Send和`'static`的约束, 此时可以用`#[future]`来消除这类约束.
pub struct SendFuture<T: Future> {
    future: T,
}

unsafe impl<T: Future> Send for SendFuture<T> {}

impl<T: Future> SendFuture<T> {
    /// # Safety
    /// 使用人员确保`#[future]`的正确使用，future内部不支持Send的数据类型不存在并发访问场景.
    pub unsafe fn new(future: T) -> Self {
        Self { future }
    }
}

impl<T: Future> Future for SendFuture<T> {
    type Output = T::Output;
    #[inline(always)]
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        Future::poll(
            unsafe { Pin::new_unchecked(&mut Pin::into_inner_unchecked(self).future) },
            ctx,
        )
    }
}

pub(crate) struct FnOnceFuture<F, R>(MaybeUninit<F>)
where
    F: FnOnce() -> R;

impl<F, R> FnOnceFuture<F, R>
where
    F: FnOnce() -> R,
{
    pub(crate) fn new(f: F) -> Self {
        Self(MaybeUninit::new(f))
    }
}

impl<F, R> Future for FnOnceFuture<F, R>
where
    F: FnOnce() -> R,
{
    type Output = R;
    fn poll(self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = unsafe { &mut Pin::into_inner_unchecked(self) };
        Poll::Ready(unsafe { this.0.assume_init_read() }())
    }
}

pub(crate) struct Sleep {
    timer: Timer,
    waker: Option<TaskRef>,
    timeout: Duration,
}

impl Unpin for Sleep {}

unsafe impl Send for Sleep {}

impl Sleep {
    pub(crate) fn new(timeout: Duration) -> Self {
        Self {
            timer: Timer::new(Self::timer_handle),
            waker: None,
            timeout,
        }
    }

    fn timer_handle(timer: &Timer, sched: &mut dyn Scheduler) {
        let this = unsafe { container_of_mut!(timer, Self, timer) };
        let mut task = this.waker.take().unwrap();
        task.status.unfreeze_local();
        this.timeout = Duration::MAX;
        task.fast_wake(unsafe { ActiveWorker::from_sched(sched) });
    }

    fn cancel(&mut self, ctx: &mut Context<'_>) {
        if let Some(mut task) = self.waker.take() {
            unsafe { ctx.del_timer(&self.timer) };
            self.timeout = Duration::MAX;
            task.status.unfreeze_local();
            ctx.cache(task);
        }
    }
}

impl Future for Sleep {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if ctx.aborted() {
            self.cancel(ctx);
            return Poll::Pending;
        }

        if self.timeout == Duration::MAX {
            return Poll::Ready(());
        }
        if self.waker.is_none() {
            self.waker = Some(ctx.task_ref());
            // 有全局资源挂接在task_waker.sched()上，则资源释放之前应该调用freeze_local,
            // 保证这段时间一定在task_waker.sched()上被调度
            ctx.freeze_local();
            unsafe { ctx.set_timer(&self.timer, self.timeout.as_micros() as u32) };
        }
        Poll::Pending
    }
}

pub(crate) struct Yield(bool);

impl Yield {
    pub fn new() -> Self {
        Self(false)
    }
}

impl Future for Yield {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.0 {
            return Poll::Ready(());
        }
        ctx.yield_now();
        self.0 = true;
        Poll::Pending
    }
}

pub(crate) struct Exit;

impl Future for Exit {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        ctx.exit();
        Poll::Ready(())
    }
}

pub(crate) struct Aborted;

impl Future for Aborted {
    type Output = bool;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(ctx.aborted())
    }
}

pub(crate) struct SetAborted(pub(crate) bool);

impl Future for SetAborted {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        ctx.set_aborted(self.0);
        Poll::Ready(())
    }
}

pub(crate) struct StopWorker;

impl Future for StopWorker {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        ctx.stop();
        Poll::Ready(())
    }
}


#[repr(C)]
pub(crate) struct Ready<T, F, R>
where
    T: Future,
    F: FnMut(T::Output) -> R,
{
    finished: bool,
    future: T,
    post: F,
}

impl<T: Future, F: FnMut(T::Output) -> R, R> Unpin for Ready<T, F, R> {}

impl<T: Future, F: FnMut(T::Output) -> R, R> Ready<T, F, R> {
    pub(crate) fn new(future: T, post: F) -> Self {
        Self {
            future,
            post,
            finished: false,
        }
    }
}

impl<T: Future, F: FnMut(T::Output) -> R, R> Future for Ready<T, F, R> {
    type Output = R;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.finished {
            return Poll::Pending;
        }
        let pinned = unsafe { Pin::new_unchecked(&mut self.future) };
        match Future::poll(pinned, ctx) {
            Poll::Ready(result) => {
                let ret = (self.post)(result);
                self.finished = true;
                Poll::Ready(ret)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

// ready用不同byte代表future1/future2的运行状态
// 0x000000: 什么都没调用
// 0x000001: future1，返回了Pending
// 0x000011: future1，返回了Ready, 对应的返回值有效
// 0x000111: future1，abort, 对应的返回值无效
// 0x001000: future2, 返回了Pending
// 0x011000: future2, 返回了Ready, 对应的返回值有效
// 0x111000: future2, abort, 对应的返回值无效
union OutPair<T1, T2> {
    val1: ManuallyDrop<MaybeUninit<T1>>,
    val2: ManuallyDrop<MaybeUninit<T2>>,
}

impl<T1, T2> OutPair<T1, T2> {
    fn new() -> Self {
        Self {
            val1: ManuallyDrop::new(MaybeUninit::uninit()),
        }
    }

    fn write_1(&mut self, val: T1) {
        unsafe { self.val1.write(val) };
    }

    fn write_2(&mut self, val: T2) {
        unsafe { self.val2.write(val) };
    }

    unsafe fn read_1(&self) -> T1 {
        unsafe { self.val1.assume_init_read() }
    }

    unsafe fn read_2(&self) -> T2 {
        unsafe { self.val2.assume_init_read() }
    }
}

#[repr(C)]
struct Pair<T1: Future, T2: Future> {
    ready: u32,
    future1: T1,
    output: OutPair<T1::Output, T2::Output>,
    future2: T2,
}

impl<T1: Future, T2: Future> Pair<T1, T2> {
    fn new(future1: T1, future2: T2) -> Self {
        Self {
            future1,
            future2,
            output: OutPair::new(),
            ready: 0,
        }
    }

    fn write_1(&mut self, val: T1::Output) {
        self.output.write_1(val);
        self.ready |= 0x010000;
    }

    fn write_2(&mut self, val: T2::Output) {
        self.output.write_2(val);
        self.ready |= 0x100000;
    }

    fn read_1(&mut self) -> Option<T1::Output> {
        if (self.ready & 0x010000) > 0 {
            self.ready &= !0x010000;
            unsafe { Some(self.output.read_1()) }
        } else {
            None
        }
    }

    fn read_2(&mut self) -> Option<T2::Output> {
        if (self.ready & 0x100000) > 0 {
            self.ready &= !0x100000;
            unsafe { Some(self.output.read_2()) }
        } else {
            None
        }
    }

    fn abort(&mut self, ctx: &mut Context<'_>) {
        if (self.ready & 0xFF) == 0x01 {
            let pinned1 = unsafe { Pin::new_unchecked(&mut self.future1) };
            let _ = Future::poll(pinned1, ctx);
            self.ready |= 0x11;
        } else {
            let _ = self.read_1();
        }

        if (self.ready & 0xFF00) == 0x0100 {
            let pinned2 = unsafe { Pin::new_unchecked(&mut self.future2) };
            let _ = Future::poll(pinned2, ctx);
            self.ready |= 0x1100;
        } else {
            let _ = self.read_2();
        }
    }
    fn force_abort(&mut self, ctx: &mut Context<'_>) {
        ctx.set_aborted(true);
        self.abort(ctx);
        ctx.set_aborted(false);
    }
}

impl<T1: Future, T2: Future> Drop for Pair<T1, T2> {
    fn drop(&mut self) {
        let _ = self.read_1();
        let _ = self.read_2();
    }
}

/// 将两个Future组合为一个，并按顺序执行
/// 比如delay策略即由Seq<Sleep, T>来实现
pub(crate) struct Seq<T1: Future, T2: Future>(Pair<T1, T2>);

impl<T1: Future, T2: Future> Unpin for Seq<T1, T2> {}

impl<T1: Future, T2: Future> Seq<T1, T2> {
    pub(crate) fn new(future1: T1, future2: T2) -> Self {
        Self(Pair::new(future1, future2))
    }
}

impl<T1: Future, T2: Future> Future for Seq<T1, T2> {
    type Output = (T1::Output, T2::Output);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if ctx.aborted() {
            self.0.abort(ctx);
            return Poll::Pending;
        }
        if self.0.ready == 0 {
            self.0.ready = 0x0001;
        }
        if self.0.ready == 0x0001 {
            let pinned1 = unsafe { Pin::new_unchecked(&mut self.0.future1) };
            if Future::poll(pinned1, ctx)
                .map(|v| {
                    self.0.ready = 0x0111;
                    self.0.write_1(v);
                })
                .is_pending()
            {
                return Poll::Pending;
            }
        }

        if (self.0.ready & 0xFFFF) == 0x0111 {
            let pinned2 = unsafe { Pin::new_unchecked(&mut self.0.future2) };
            if let Poll::Ready(v) = Future::poll(pinned2, ctx) {
                self.0.ready |= 0x1111;
                return Poll::Ready((self.0.read_1().unwrap(), v));
            }
        }
        Poll::Pending
    }
}

pub(crate) struct Or<T1: Future, T2: Future>(Pair<T1, T2>);

impl<T1: Future, T2: Future> Unpin for Or<T1, T2> {}

impl<T1: Future, T2: Future> Or<T1, T2> {
    pub(crate) fn new(future1: T1, future2: T2) -> Self {
        Self(Pair::new(future1, future2))
    }
}

impl<T1: Future + Future, T2: Future + Future> Future for Or<T1, T2> {
    type Output = (Option<T1::Output>, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if ctx.aborted() {
            self.0.abort(ctx);
            return Poll::Pending;
        }

        match self.0.ready {
            0x0011 | 0x1111 => return Poll::Pending,
            0x0000 => self.0.ready = 0x0001,
            _ => {}
        }

        let pinned1 = unsafe { Pin::new_unchecked(&mut self.0.future1) };

        if let Poll::Ready(v) = Future::poll(pinned1, ctx) {
            self.0.ready |= 0x0011;
            self.0.force_abort(ctx);
            return Poll::Ready((Some(v), None));
        }

        let pinned2 = unsafe { Pin::new_unchecked(&mut self.0.future2) };
        if let Poll::Ready(v) = Future::poll(pinned2, ctx) {
            self.0.ready = 0x1101;
            self.0.force_abort(ctx);
            Poll::Ready((None, Some(v)))
        } else {
            self.0.ready = 0x0101;
            Poll::Pending
        }
    }
}

pub(crate) struct And<T1: Future, T2: Future>(Pair<T1, T2>);

impl<T1: Future, T2: Future> Unpin for And<T1, T2> {}

impl<T1: Future, T2: Future> And<T1, T2> {
    pub(crate) fn new(future1: T1, future2: T2) -> Self {
        Self(Pair::new(future1, future2))
    }
}

impl<T1: Future, T2: Future> Future for And<T1, T2> {
    type Output = (T1::Output, T2::Output);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if ctx.aborted() {
            self.0.abort(ctx);
            return Poll::Pending;
        }

        if self.0.ready == 0 {
            self.0.ready |= 0x0101;
        }
        if (self.0.ready & 0xFF) == 0x01 {
            let pinned1 = unsafe { Pin::new_unchecked(&mut self.0.future1) };
            let _ = Future::poll(pinned1, ctx).map(|v| {
                self.0.ready |= 0x11;
                self.0.write_1(v);
            });
        }

        if (self.0.ready & 0xFF00) == 0x0100 {
            let pinned2 = unsafe { Pin::new_unchecked(&mut self.0.future2) };
            let _ = Future::poll(pinned2, ctx).map(|v| {
                self.0.ready |= 0x1100;
                self.0.write_2(v);
            });
        }

        if self.0.ready == 0x00111111 {
            Poll::Ready((self.0.read_1().unwrap(), self.0.read_2().unwrap()))
        } else {
            Poll::Pending
        }
    }
}

/// 提供不同future的各种组合能力
pub trait Extensions {
    /// 延迟指定时间后才执行
    fn delay(self, timeout: Duration) -> impl Future<Output = Self::Output>
    where
        Self: Future;
    /// 如果在指定时间(相对时间）内未完成，则提前终止，并返回None
    fn deadline(self, timeout: Duration) -> impl Future<Output = Option<Self::Output>>
    where
        Self: Future;
    /// 顺序执行，都完成后一起返回
    fn seq<T>(self, future: T) -> impl Future<Output = (Self::Output, T::Output)>
    where
        Self: Future,
        T: Future;
    /// 并发执行, 任意一个完成则返回，注意如果另一个已经启动但未完成，则会触发abort, 参见TaskContext.
    fn or<T>(self, future: T) -> impl Future<Output = (Option<Self::Output>, Option<T::Output>)>
    where
        Self: Future,
        T: Future;
    /// 并发执行，两个都完成后才返回,
    fn and<T>(self, future: T) -> impl Future<Output = (Self::Output, T::Output)>
    where
        Self: Future,
        T: Future;
    /// 相当于Poll::map调用，只是适用返回值为impl Future的非async函数.
    fn ready<F: FnMut(Self::Output) -> R, R>(self, f: F) -> impl Future<Output = R>
    where
        Self: Future;

    /// 可消除wait_event(AioFd::wait)每次注册，取消注册动作，在异步任务全生命周期只会注册一次
    /// 结合POLLET，可以提升io效率
    /// N: 表示使用的最多Fd数量
    #[deprecated(note = "不再需要显示调用，一个异步任务缺省支持32个fd，如果的确需要更多，可利用runtime::task::spawn_with中的参数attr指定fdset的最大值")]
    fn wait_entry<const N: usize>(self) -> impl Future<Output = Self::Output>
    where
        Self: Future;
}

impl<T: Future> Extensions for T {
    fn delay(self, timeout: Duration) -> impl Future<Output = T::Output> {
        Seq::new(Sleep::new(timeout), self).ready(|(_, v)| v)
    }
    fn deadline(self, timeout: Duration) -> impl Future<Output = Option<T::Output>> {
        Or::new(self, Sleep::new(timeout)).ready(|(v, _)| v)
    }

    fn ready<F: FnMut(T::Output) -> R, R>(self, f: F) -> impl Future<Output = R> {
        Ready::new(self, f)
    }

    fn or<F>(self, future: F) -> impl Future<Output = (Option<T::Output>, Option<F::Output>)>
    where
        F: Future,
    {
        Or::new(self, future)
    }

    fn seq<F>(self, future: F) -> impl Future<Output = (T::Output, F::Output)>
    where
        F: Future,
    {
        Seq::new(self, future)
    }

    fn and<F>(self, future: F) -> impl Future<Output = (T::Output, F::Output)>
    where
        F: Future,
    {
        And::new(self, future)
    }

    fn wait_entry<const N: usize>(self) -> impl Future<Output = T::Output>
    where
        Self: Future,
    {
        self
    }
}

pub(crate) struct SetAffinity;

impl Future for SetAffinity {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        ctx.set_affinity();
        Poll::Ready(())
    }
}
